// Copyright 2025 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

#include <filesystem>
#include <vector>

#include <gtest/gtest.h>

#include "dbms/database.hpp"
#include "replication/state.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "tests/test_commit_args_helper.hpp"

namespace {
class TmpDirManager final {
 public:
  explicit TmpDirManager(const std::string_view directory)
      : tmp_dir_{std::filesystem::temp_directory_path() / directory} {
    CreateDir();
  }
  ~TmpDirManager() { Clear(); }

  const std::filesystem::path &Path() const { return tmp_dir_; }

 private:
  std::filesystem::path tmp_dir_;

  void CreateDir() {
    if (!std::filesystem::exists(tmp_dir_)) {
      std::filesystem::create_directory(tmp_dir_);
    }
  }

  void Clear() {
    if (!std::filesystem::exists(tmp_dir_)) return;
    std::filesystem::remove_all(tmp_dir_);
  }
};

// Parses a snapshot name generated by MakeSnapshotName, e.g. "2024-05-30T11-22-34.923Z_timestamp_12345678"
// Returns a pair of (datetime, start_timestamp) as strings, or std::nullopt if invalid.
inline std::optional<std::pair<std::string, std::string>> ParseSnapshotName(const std::string &name) {
  static constexpr std::string_view delim = "_timestamp_";
  auto pos = name.find(delim);
  if (pos == std::string::npos) return std::nullopt;
  std::string datetime = name.substr(0, pos);
  std::string timestamp = name.substr(pos + delim.size());
  if (datetime.empty() || timestamp.empty()) return std::nullopt;
  return std::pair(datetime, timestamp);
}

}  // namespace

class RecoverSnapshotTest : public ::testing::Test {
 protected:
  void SetUp() override {
    config_.durability.storage_directory = temp_dir_.Path();
    config_.durability.snapshot_wal_mode =
        memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL;
    config_.durability.snapshot_interval = memgraph::utils::SchedulerInterval{std::chrono::minutes{1}};
    config_.durability.wal_file_size_kibibytes = 1;
    config_.durability.wal_file_flush_every_n_tx = 1;
    config_.durability.recover_on_startup = false;
    config_.durability.snapshot_on_exit = false;
  }

  TmpDirManager temp_dir_{"MG_test_unit_storage_v2_recover_snapshot"};
  memgraph::storage::Config config_;
};

// Create some data
// Create snapshots
// Use an older snapshot to recover
// Same uuid, same data dir
// Multiple recoveries (.old should contain one backup of the dir)
// NO WALs
TEST_F(RecoverSnapshotTest, RecoverSnapshotCreatesOldDirectory) {
  // Create initial storage and add some data
  memgraph::utils::Synchronized<memgraph::replication::ReplicationState, memgraph::utils::RWSpinLock> repl_state{
      memgraph::storage::ReplicationStateRootPath(config_)};
  memgraph::dbms::Database db{config_, repl_state};
  auto *storage = static_cast<memgraph::storage::InMemoryStorage *>(db.storage());

  // Add some data
  {
    auto acc = storage->Access();
    auto vertex = acc->CreateVertex();
    ASSERT_FALSE(acc->PrepareForCommitPhase(memgraph::tests::MakeMainCommitArgs()).HasError());
  }

  // Create a snapshot
  auto snapshot_result = storage->CreateSnapshot();
  ASSERT_FALSE(snapshot_result.HasError());
  auto snapshot_path = snapshot_result.GetValue();

  // Add more data
  {
    auto acc = storage->Access();
    auto vertex = acc->CreateVertex();
    ASSERT_FALSE(acc->PrepareForCommitPhase(memgraph::tests::MakeMainCommitArgs()).HasError());
  }

  // Create another snapshot to have multiple snapshots
  auto snapshot_result2 = storage->CreateSnapshot();
  ASSERT_FALSE(snapshot_result2.HasError());

  // Verify we have snapshots in the directory
  auto snapshots_dir = config_.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory;
  ASSERT_TRUE(std::filesystem::exists(snapshots_dir));

  std::vector<std::filesystem::path> snapshot_files;
  for (const auto &entry : std::filesystem::directory_iterator(snapshots_dir)) {
    if (entry.is_regular_file()) {
      snapshot_files.push_back(entry.path());
    }
  }
  ASSERT_GE(snapshot_files.size(), 2) << "Should have at least 2 snapshot files";

  // Recover from the first snapshot
  auto recover_result =
      storage->RecoverSnapshot(snapshot_path, true, memgraph::replication_coordination_glue::ReplicationRole::MAIN);
  ASSERT_FALSE(recover_result.HasError()) << "RecoverSnapshot should succeed";

  // Verify .old directory was created
  auto old_snapshots_dir = snapshots_dir / ".old";
  ASSERT_TRUE(std::filesystem::exists(old_snapshots_dir)) << ".old directory should exist";

  // Verify .old directory contains files
  std::vector<std::filesystem::path> old_files;
  for (const auto &entry : std::filesystem::directory_iterator(old_snapshots_dir)) {
    if (entry.is_regular_file()) {
      old_files.push_back(entry.path());
    }
  }
  ASSERT_EQ(old_files.size(), 2) << ".old directory should contain files";

  // Verify current snapshots directory has only the recovered snapshot
  std::vector<std::filesystem::path> current_files;
  for (const auto &entry : std::filesystem::directory_iterator(snapshots_dir)) {
    if (entry.is_regular_file()) {
      current_files.push_back(entry.path());
    }
  }
  ASSERT_EQ(current_files.size(), 1) << "Should have exactly one snapshot file after recovery";

  // Verify the recovered snapshot has a new name
  auto new_snapshot_path = current_files[0];
  auto parsed_name = ParseSnapshotName(new_snapshot_path.filename().string());
  auto old_info = ParseSnapshotName(snapshot_path.filename().string());
  ASSERT_TRUE(parsed_name.has_value()) << "Should be able to parse the recovered snapshot name";
  ASSERT_TRUE(old_info.has_value()) << "Should be able to parse the original snapshot name";
  ASSERT_GT(parsed_name->first, old_info->first)
      << "Recovered snapshot " << new_snapshot_path << " should have the same datetime as the original snapshot";
  ASSERT_EQ(parsed_name->second, old_info->second)
      << "Recovered snapshot " << new_snapshot_path << " should have the same start timestamp as the original snapshot";

  // Verify there is only one vertex (the one created in the first transaction)
  auto acc = storage->Access();
  auto vertices = acc->Vertices(memgraph::storage::View::NEW);
  int count = 0;
  for (const auto &vertex : vertices) {
    count++;
  }
  ASSERT_EQ(count, 1) << "Should have exactly one vertex after recovery";

  // Second recovery
  auto recover2_result =
      storage->RecoverSnapshot(new_snapshot_path, true, memgraph::replication_coordination_glue::ReplicationRole::MAIN);
  ASSERT_FALSE(recover2_result.HasError()) << "RecoverSnapshot should succeed";

  // Verify .old contains the new snapshot only
  old_files.clear();
  for (const auto &entry : std::filesystem::directory_iterator(old_snapshots_dir)) {
    if (entry.is_regular_file()) {
      old_files.push_back(entry.path());
    }
  }
  ASSERT_EQ(old_files.size(), 1) << ".old directory should contain files";
  ASSERT_EQ(old_files[0].filename(), new_snapshot_path.filename()) << ".old directory should contain the new snapshot";

  // Verify current snapshots directory has only the recovered snapshot
  current_files.clear();
  for (const auto &entry : std::filesystem::directory_iterator(snapshots_dir)) {
    if (entry.is_regular_file()) {
      current_files.push_back(entry.path());
    }
  }
  ASSERT_EQ(current_files.size(), 1) << "Should have exactly one snapshot file after recovery";
  // Verify the recovered snapshot has a new name
  new_snapshot_path = current_files[0];
  parsed_name = ParseSnapshotName(new_snapshot_path.filename().string());
  ASSERT_TRUE(parsed_name.has_value()) << "Should be able to parse the recovered snapshot name";
  ASSERT_GT(parsed_name->first, old_info->first)
      << "Recovered snapshot should have a newer datetime than the original snapshot";
  ASSERT_EQ(parsed_name->second, old_info->second)
      << "Recovered snapshot should have a newer start timestamp than the original snapshot";
}

// Create some data
// Create snapshots
// Startup new storage
// Recover from the local snapshot
// Verify the data is the same
// NO WALs
TEST_F(RecoverSnapshotTest, RecoverSnapshotFromLocalStorage) {
  // Create initial storage and add some data
  memgraph::utils::Synchronized<memgraph::replication::ReplicationState, memgraph::utils::RWSpinLock> repl_state{
      memgraph::storage::ReplicationStateRootPath(config_)};
  std::optional<memgraph::dbms::Database> db = std::make_optional<memgraph::dbms::Database>(config_, repl_state);
  ASSERT_TRUE(db.has_value()) << "Database should be created";
  auto *storage = static_cast<memgraph::storage::InMemoryStorage *>(db->storage());

  // Add some data
  {
    auto acc = storage->Access();
    auto vertex = acc->CreateVertex();
    ASSERT_FALSE(acc->PrepareForCommitPhase(memgraph::tests::MakeMainCommitArgs()).HasError());
  }

  // Create a snapshot
  auto snapshot_result = storage->CreateSnapshot();
  ASSERT_FALSE(snapshot_result.HasError());

  // Move the snapshot to the local storage
  TmpDirManager local_dir{"MG_test_unit_storage_v2_recover_snapshot_local"};
  auto snapshot_path = snapshot_result.GetValue();
  std::filesystem::rename(snapshot_path, local_dir.Path() / snapshot_path.filename());
  snapshot_path = local_dir.Path() / snapshot_path.filename();

  // Restart storage
  db.reset();
  db.emplace(config_, repl_state);
  ASSERT_TRUE(db.has_value()) << "Database should be created";
  storage = static_cast<memgraph::storage::InMemoryStorage *>(db->storage());

  // Verify storage is empty
  {
    auto acc = storage->Access();
    auto vertices = acc->Vertices(memgraph::storage::View::NEW);
    int count = 0;
    for (const auto &vertex : vertices) {
      count++;
    }
    ASSERT_EQ(count, 0) << "Should have exactly zero vertices after restart";
  }

  {
    auto snapshot_result = storage->CreateSnapshot();
    ASSERT_FALSE(snapshot_result.HasError());
  }
  // Recover from the local snapshot
  auto recover_result =
      storage->RecoverSnapshot(snapshot_path, true, memgraph::replication_coordination_glue::ReplicationRole::MAIN);
  ASSERT_FALSE(recover_result.HasError()) << "RecoverSnapshot should succeed";

  // Verify storage is empty
  {
    auto acc = storage->Access();
    auto vertices = acc->Vertices(memgraph::storage::View::NEW);
    int count = 0;
    for (const auto &vertex : vertices) {
      count++;
    }
    ASSERT_EQ(count, 1) << "Should have exactly one vertex after recovery";
  }

  // Verify .old contains the old snapshot
  auto old_snapshots_dir =
      config_.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory / ".old";
  std::vector<std::filesystem::path> old_files;
  for (const auto &entry : std::filesystem::directory_iterator(old_snapshots_dir)) {
    if (entry.is_regular_file()) {
      old_files.push_back(entry.path());
    }
  }
  ASSERT_GT(old_files.size(), 0) << ".old directory should contain files";

  // Verify current snapshots directory has only the recovered snapshot
  auto current_snapshots_dir = config_.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory;
  std::vector<std::filesystem::path> current_files;
  for (const auto &entry : std::filesystem::directory_iterator(current_snapshots_dir)) {
    if (entry.is_regular_file()) {
      current_files.push_back(entry.path());
    }
  }
  ASSERT_EQ(current_files.size(), 1) << "Should have exactly one snapshot file after recovery";

  // Verify name
  auto parsed_name = ParseSnapshotName(current_files[0].filename().string());
  auto old_info = ParseSnapshotName(snapshot_path.filename().string());
  ASSERT_TRUE(parsed_name.has_value()) << "Should be able to parse the recovered snapshot name";
  ASSERT_TRUE(old_info.has_value()) << "Should be able to parse the original snapshot name";
  ASSERT_GT(parsed_name->first, old_info->first)
      << "Recovered snapshot " << current_files[0] << " should have a newer datetime than the original snapshot";
  ASSERT_EQ(parsed_name->second, old_info->second)
      << "Recovered snapshot " << current_files[0] << " should have the same start timestamp as the original snapshot";
}

// Create some data
// Create snapshots
// Create more data (generates WALs)
// Startup new storage
// Recover from the local snapshot
// Verify the data is correct
TEST_F(RecoverSnapshotTest, RecoverSnapshotWithWALs) {
  // Create initial storage and add some data
  memgraph::utils::Synchronized<memgraph::replication::ReplicationState, memgraph::utils::RWSpinLock> repl_state{
      memgraph::storage::ReplicationStateRootPath(config_)};
  std::optional<memgraph::dbms::Database> db = std::make_optional<memgraph::dbms::Database>(config_, repl_state);
  ASSERT_TRUE(db.has_value()) << "Database should be created";
  auto *storage = static_cast<memgraph::storage::InMemoryStorage *>(db->storage());

  // Add some data
  {
    auto acc = storage->Access();
    auto vertex = acc->CreateVertex();
    ASSERT_FALSE(acc->PrepareForCommitPhase(memgraph::tests::MakeMainCommitArgs()).HasError());
  }

  // Create a snapshot
  auto snapshot_result = storage->CreateSnapshot();
  ASSERT_FALSE(snapshot_result.HasError());
  auto snapshot_path = snapshot_result.GetValue();

  // Add some data
  {
    auto acc = storage->Access();
    auto vertex = acc->CreateVertex();
    ASSERT_FALSE(acc->PrepareForCommitPhase(memgraph::tests::MakeMainCommitArgs()).HasError());
  }

  // Recover from the snapshot
  auto recover_result =
      storage->RecoverSnapshot(snapshot_path, true, memgraph::replication_coordination_glue::ReplicationRole::MAIN);
  ASSERT_FALSE(recover_result.HasError()) << "RecoverSnapshot should succeed";

  // Verify storage has the snapshot data
  {
    auto acc = storage->Access();
    auto vertices = acc->Vertices(memgraph::storage::View::NEW);
    int count = 0;
    for (const auto &vertex : vertices) {
      count++;
    }
    ASSERT_EQ(count, 1) << "Should have exactly one vertex after recovery";
  }

  // Verify .old contains the old snapshot
  auto old_snapshots_dir =
      config_.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory / ".old";
  std::vector<std::filesystem::path> old_files;
  for (const auto &entry : std::filesystem::directory_iterator(old_snapshots_dir)) {
    if (entry.is_regular_file()) {
      old_files.push_back(entry.path());
    }
  }
  ASSERT_GT(old_files.size(), 0) << ".old directory should contain files";

  // Verify .old contains the old snapshot
  auto old_wals_dir = config_.durability.storage_directory / memgraph::storage::durability::kWalDirectory / ".old";
  std::vector<std::filesystem::path> old_wals;
  for (const auto &entry : std::filesystem::directory_iterator(old_wals_dir)) {
    if (entry.is_regular_file()) {
      old_wals.push_back(entry.path());
    }
  }
  ASSERT_GT(old_wals.size(), 0) << ".old directory should contain files";

  // Verify current snapshots directory has only the recovered snapshot
  auto current_snapshots_dir = config_.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory;
  std::vector<std::filesystem::path> current_files;
  for (const auto &entry : std::filesystem::directory_iterator(current_snapshots_dir)) {
    if (entry.is_regular_file()) {
      current_files.push_back(entry.path());
    }
  }
  ASSERT_EQ(current_files.size(), 1) << "Should have exactly one snapshot file after recovery";

  // Verify name
  auto parsed_name = ParseSnapshotName(current_files[0].filename().string());
  auto old_info = ParseSnapshotName(snapshot_path.filename().string());
  ASSERT_TRUE(parsed_name.has_value()) << "Should be able to parse the recovered snapshot name";
  ASSERT_TRUE(old_info.has_value()) << "Should be able to parse the original snapshot name";
  ASSERT_GT(parsed_name->first, old_info->first)
      << "Recovered snapshot " << current_files[0] << " should have a newer datetime than the original snapshot";
  ASSERT_EQ(parsed_name->second, old_info->second)
      << "Recovered snapshot " << current_files[0] << " should have the same start timestamp as the original snapshot";

  // Verify current wals directory is empty
  {
    auto current_wals_dir = config_.durability.storage_directory / memgraph::storage::durability::kWalDirectory;
    std::vector<std::filesystem::path> current_wals;
    for (const auto &entry : std::filesystem::directory_iterator(current_wals_dir)) {
      if (entry.is_regular_file()) {
        current_wals.push_back(entry.path());
      }
    }
    ASSERT_EQ(current_wals.size(), 0) << "Should have exactly zero WAL files after recovery";
  }

  // Add more data (generates more WALs)
  {
    auto acc = storage->Access();
    auto vertex = acc->CreateVertex();
    ASSERT_FALSE(acc->PrepareForCommitPhase(memgraph::tests::MakeMainCommitArgs()).HasError());
  }

  // Verify WAL was created
  {
    auto current_wals_dir = config_.durability.storage_directory / memgraph::storage::durability::kWalDirectory;
    std::vector<std::filesystem::path> current_wals;
    for (const auto &entry : std::filesystem::directory_iterator(current_wals_dir)) {
      if (entry.is_regular_file()) {
        current_wals.push_back(entry.path());
      }
    }
    ASSERT_GT(current_wals.size(), 0) << "Should have exactly one WAL file after recovery";
  }
}
